Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/sentinel_connector.py (100 lines of code) (raw):
import requests
import datetime
import logging
import json
import hashlib
import hmac
import base64
import time
class AzureSentinelConnector:
def __init__(self, log_analytics_uri, workspace_id, shared_key, log_type, queue_size=200, queue_size_bytes=25 * (2**20)):
self.log_analytics_uri = log_analytics_uri
self.workspace_id = workspace_id
self.shared_key = shared_key
self.log_type = log_type
self.queue_size = queue_size
self.bulks_number = 1
self.queue_size_bytes = queue_size_bytes
self._queue = []
self._bulks_list = []
self.successfull_sent_events_number = 0
def send(self, event):
self._queue.append(event)
if len(self._queue) >= self.queue_size:
self.flush(force=False)
def flush(self, force=True):
self._bulks_list.append(self._queue)
if force:
self._flush_bulks()
else:
if len(self._bulks_list) >= self.bulks_number:
self._flush_bulks()
self._queue = []
def _flush_bulks(self):
for queue in self._bulks_list:
if queue:
queue_list = self._split_big_request(queue)
for q in queue_list:
self._post_data(self.workspace_id, self.shared_key, q, self.log_type)
self._bulks_list = []
def is_empty(self):
return not self._queue and not self._bulks_list
def __enter__(self):
pass
def __exit__(self, type, value, traceback):
self.flush()
def _build_signature(self, workspace_id, shared_key, date, content_length, method, content_type, resource):
x_headers = 'x-ms-date:' + date
string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource
bytes_to_hash = bytes(string_to_hash, encoding="utf-8")
decoded_key = base64.b64decode(shared_key)
encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode()
authorization = "SharedKey {}:{}".format(workspace_id, encoded_hash)
return authorization
def _post_data(self, workspace_id, shared_key, body, log_type):
events_number = len(body)
body = json.dumps(body)
method = 'POST'
content_type = 'application/json'
resource = '/api/logs'
rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
content_length = len(body)
signature = self._build_signature(workspace_id, shared_key, rfc1123date, content_length, method, content_type, resource)
uri = self.log_analytics_uri + resource + '?api-version=2016-04-01'
headers = {
'content-type': content_type,
'Authorization': signature,
'Log-Type': log_type,
'x-ms-date': rfc1123date
}
try_number = 1
while True:
try:
self._make_request(uri, body, headers)
except Exception as err:
if try_number < 3:
logging.warning('Error while sending data to Azure Sentinel. Try number: {}. {}'.format(try_number, err))
time.sleep(try_number)
try_number += 1
else:
logging.error(str(err))
self.failed_sent_events_number += events_number
raise err
else:
logging.info('{} events have been successfully sent to Azure Sentinel'.format(events_number))
self.successfull_sent_events_number += events_number
break
def _make_request(self, uri, body, headers):
response = requests.post(uri, data=body, headers=headers)
if not (200 <= response.status_code <= 299):
raise Exception("Error during sending events to Azure Sentinel. Response code: {}".format(response.status))
def _check_size(self, queue):
data_bytes_len = len(json.dumps(queue).encode())
return data_bytes_len < self.queue_size_bytes
def _split_big_request(self, queue):
if self._check_size(queue):
return [queue]
else:
middle = int(len(queue) / 2)
queues_list = [queue[:middle], queue[middle:]]
return self._split_big_request(queues_list[0]) + self._split_big_request(queues_list[1])